# Changing this file requires running update_embedded_sources.R to rebuild sources and jars.

arrow_write_record_batch <- function(df, spark_version_number = NULL) {
  arrow_env_vars <- list()
  if (!is.null(spark_version_number) && spark_version_number < "3.0") {
    # Spark < 3 uses an old version of Arrow, so send data in the legacy format
    arrow_env_vars$ARROW_PRE_0_15_IPC_FORMAT <- 1
  }

  withr::with_envvar(arrow_env_vars, {
    # Set the local timezone to any POSIXt columns that don't have one set
    # https://github.com/sparklyr/sparklyr/issues/2439
    df[] <- lapply(df, function(x) {
      if (inherits(x, "POSIXt") && is.null(attr(x, "tzone"))) {
        attr(x, "tzone") <- Sys.timezone()
      }
      x
    })
    arrow::write_to_raw(df, format = "stream")
  })
}

arrow_record_stream_reader <- function(stream) {
  arrow::RecordBatchStreamReader$create(stream)
}

arrow_read_record_batch <- function(reader) reader$read_next_batch()

arrow_as_tibble <- function(record) as.data.frame(record)
#' A helper function to retrieve values from \code{spark_config()}
#'
#' @param config The configuration list from \code{spark_config()}
#' @param name The name of the configuration entry
#' @param default The default value to use when entry is not present
#'
#' @keywords internal
#' @export
spark_config_value <- function(config, name, default = NULL) {
  if (getOption("sparklyr.test.enforce.config", FALSE) && any(grepl("^sparklyr.", name))) {
    settings <- get("spark_config_settings")()
    if (!any(name %in% settings$name) &&
      !grepl("^sparklyr\\.shell\\.", name)) {
      stop("Config value '", name[[1]], "' not described in spark_config_settings()")
    }
  }

  name_exists <- name %in% names(config)
  if (!any(name_exists)) {
    name_exists <- name %in% names(options())
    if (!any(name_exists)) {
      value <- default
    } else {
      name_primary <- name[name_exists][[1]]
      value <- getOption(name_primary)
    }
  } else {
    name_primary <- name[name_exists][[1]]
    value <- config[[name_primary]]
  }

  if (is.language(value)) value <- rlang::as_closure(value)
  if (is.function(value)) value <- value()
  value
}

spark_config_integer <- function(config, name, default = NULL) {
  as.integer(spark_config_value(config, name, default))
}

spark_config_logical <- function(config, name, default = NULL) {
  as.logical(spark_config_value(config, name, default))
}
#' Check whether the connection is open
#'
#' @param sc \code{spark_connection}
#'
#' @keywords internal
#'
#' @export
connection_is_open <- function(sc) {
  UseMethod("connection_is_open")
}
read_bin <- function(con, what, n, endian = NULL) {
  UseMethod("read_bin")
}

#' @export
read_bin.default <- function(con, what, n, endian = NULL) {
  if (is.null(endian)) readBin(con, what, n) else readBin(con, what, n, endian = endian)
}

read_bin_wait <- function(con, what, n, endian = NULL) {
  sc <- con
  con <- if (!is.null(sc$state) && identical(sc$state$use_monitoring, TRUE)) sc$monitoring else sc$backend

  timeout <- spark_config_value(sc$config, "sparklyr.backend.timeout", 30 * 24 * 60 * 60)
  progressInterval <- spark_config_value(sc$config, "sparklyr.progress.interval", 3)

  result <- if (is.null(endian)) readBin(con, what, n) else readBin(con, what, n, endian = endian)

  progressTimeout <- Sys.time() + progressInterval
  if (is.null(sc$state$progress)) {
    sc$state$progress <- new.env()
  }
  progressUpdated <- FALSE

  waitInterval <- 0
  commandStart <- Sys.time()
  while (length(result) == 0 && commandStart + timeout > Sys.time()) {
    Sys.sleep(waitInterval)
    waitInterval <- min(0.1, waitInterval + 0.01)

    result <- if (is.null(endian)) readBin(con, what, n) else readBin(con, what, n, endian = endian)

    if (Sys.time() > progressTimeout) {
      progressTimeout <- Sys.time() + progressInterval
      if (exists("connection_progress")) {
        connection_progress(sc)
        progressUpdated <- TRUE
      }
    }
  }

  if (progressUpdated) connection_progress_terminated(sc)

  if (commandStart + timeout <= Sys.time()) {
    stop("Operation timed out, increase config option sparklyr.backend.timeout if needed.")
  }

  result
}

read_bin.spark_connection <- function(con, what, n, endian = NULL) {
  read_bin_wait(con, what, n, endian)
}

#' @export
read_bin.spark_worker_connection <- function(con, what, n, endian = NULL) {
  read_bin_wait(con, what, n, endian)
}

#' @export
read_bin.livy_backend <- function(con, what, n, endian = NULL) {
  read_bin.default(con$rc, what, n, endian)
}

readObject <- function(con) {
  # Read type first
  type <- readType(con)
  readTypedObject(con, type)
}

readTypedObject <- function(con, type) {
  switch(type,
    "i" = readInt(con),
    "c" = readString(con),
    "b" = readBoolean(con),
    "d" = readDouble(con),
    "r" = readRaw(con),
    "D" = readDate(con),
    "t" = readTime(con),
    "a" = readArray(con),
    "l" = readList(con),
    "e" = readMap(con),
    "s" = readStruct(con),
    "f" = readStringArray(con),
    "n" = NULL,
    "j" = getJobj(con, readString(con)),
    "J" = jsonlite::fromJSON(
      readString(con),
      simplifyDataFrame = FALSE, simplifyMatrix = FALSE
    ),
    stop(paste("Unsupported type for deserialization", type))
  )
}

readString <- function(con) {
  stringLen <- readInt(con)

  string <- (
    if (stringLen > 0) {
      raw <- read_bin(con, raw(), stringLen, endian = "big")
      if (is.element("00", raw)) {
        warning("Input contains embedded nuls, removing.")
        raw <- raw[raw != "00"]
      }
      rawToChar(raw)
    } else if (stringLen == 0) {
      ""
    } else {
      NA_character_
    }
  )

  Encoding(string) <- "UTF-8"
  string
}

readStringArray <- function(con) {
  joined <- readString(con)
  arr <- as.list(strsplit(joined, "\u0019")[[1]])
  lapply(
    arr,
    function(x) {
      if (x == "<NA>") NA_character_ else x
    }
  )
}

readDateArray <- function(con, n = 1) {
  if (n == 0) {
    as.Date(NA)
  } else {
    do.call(c, lapply(seq(n), function(x) readDate(con)))
  }
}

readInt <- function(con, n = 1) {
  if (n == 0) {
    integer(0)
  } else {
    read_bin(con, integer(), n = n, endian = "big")
  }
}

readDouble <- function(con, n = 1) {
  if (n == 0) {
    double(0)
  } else {
    read_bin(con, double(), n = n, endian = "big")
  }
}

readBoolean <- function(con, n = 1) {
  if (n == 0) {
    logical(0)
  } else {
    as.logical(readInt(con, n = n))
  }
}

readType <- function(con) {
  rawToChar(read_bin(con, "raw", n = 1L))
}

readDate <- function(con) {
  n <- readInt(con)
  if (is.na(n)) {
    as.Date(NA)
  } else {
    d <- as.Date(n, origin = "1970-01-01")
    if (getOption("sparklyr.collect.datechars", FALSE)) {
      as.character(d)
    } else {
      d
    }
  }
}

readTime <- function(con, n = 1) {
  if (identical(n, 0)) {
    as.POSIXct(character(0))
  } else {
    t <- readDouble(con, n)

    r <- as.POSIXct(t, origin = "1970-01-01")
    if (getOption("sparklyr.collect.datechars", FALSE)) {
      as.character(r)
    } else {
      r
    }
  }
}

readArray <- function(con) {
  type <- readType(con)
  len <- readInt(con)

  if (type == "d") {
    return(readDouble(con, n = len))
  } else if (type == "i") {
    return(readInt(con, n = len))
  } else if (type == "b") {
    return(readBoolean(con, n = len))
  } else if (type == "t") {
    return(readTime(con, n = len))
  } else if (type == "D") {
    return(readDateArray(con, n = len))
  }

  if (len > 0) {
    l <- vector("list", len)
    for (i in seq_len(len)) {
      l[[i]] <- readTypedObject(con, type)
    }
    l
  } else {
    list()
  }
}

# Read a list. Types of each element may be different.
# Null objects are read as NA.
readList <- function(con) {
  len <- readInt(con)
  if (len > 0) {
    l <- vector("list", len)
    for (i in seq_len(len)) {
      elem <- readObject(con)
      if (is.null(elem)) {
        elem <- NA
      }
      l[[i]] <- elem
    }
    l
  } else {
    list()
  }
}

readMap <- function(con) {
  map <- list()
  len <- readInt(con)
  if (len > 0) {
    for (i in seq_len(len)) {
      key <- readString(con)
      value <- readObject(con)
      map[[key]] <- value
    }
  }

  map
}

# Convert a named list to struct so that
# SerDe won't confuse between a normal named list and struct
listToStruct <- function(list) {
  stopifnot(class(list) == "list")
  stopifnot(!is.null(names(list)))
  class(list) <- "struct"
  list
}

# Read a field of StructType from DataFrame
# into a named list in R whose class is "struct"
readStruct <- function(con) {
  names <- readObject(con)
  fields <- readObject(con)
  names(fields) <- names
  listToStruct(fields)
}

readRaw <- function(con) {
  dataLen <- readInt(con)
  if (dataLen == -1) {
    NA
  } else if (dataLen == 0) {
    raw()
  } else {
    read_bin(con, raw(), as.integer(dataLen), endian = "big")
  }
}
sparklyr_gateway_trouble_shooting_msg <- function() {
  c(
    "\n\n\nTry running `options(sparklyr.log.console = TRUE)` followed by ",
    "`sc <- spark_connect(...)` for more debugging info."
  )
}

wait_connect_gateway <- function(gatewayAddress, gatewayPort, config, isStarting) {
  waitSeconds <- if (isStarting) {
    spark_config_value(config, "sparklyr.connect.timeout", 60)
  } else {
    spark_config_value(config, "sparklyr.gateway.timeout", 1)
  }

  gateway <- NULL
  commandStart <- Sys.time()

  while (is.null(gateway) && Sys.time() < commandStart + waitSeconds) {
    tryCatch(
      {
        suppressWarnings({
          timeout <- spark_config_value(config, "sparklyr.gateway.interval", 1)
          gateway <- socketConnection(
            host = gatewayAddress,
            port = gatewayPort,
            server = FALSE,
            blocking = TRUE,
            open = "rb",
            timeout = timeout
          )
        })
      },
      error = function(err) {
      }
    )

    startWait <- spark_config_value(config, "sparklyr.gateway.wait", 50 / 1000)
    Sys.sleep(startWait)
  }

  gateway
}

spark_gateway_commands <- function() {
  list(
    "GetPorts" = 0,
    "RegisterInstance" = 1
  )
}

query_gateway_for_port <- function(gateway, sessionId, config, isStarting) {
  waitSeconds <- if (isStarting) {
    spark_config_value(config, "sparklyr.connect.timeout", 60)
  } else {
    spark_config_value(config, "sparklyr.gateway.timeout", 1)
  }

  writeInt(gateway, spark_gateway_commands()[["GetPorts"]])
  writeInt(gateway, sessionId)
  writeInt(gateway, if (isStarting) waitSeconds else 0)

  backendSessionId <- NULL
  redirectGatewayPort <- NULL

  commandStart <- Sys.time()
  while (length(backendSessionId) == 0 && commandStart + waitSeconds > Sys.time()) {
    backendSessionId <- readInt(gateway)
    Sys.sleep(0.1)
  }

  redirectGatewayPort <- readInt(gateway)
  backendPort <- readInt(gateway)

  if (length(backendSessionId) == 0 || length(redirectGatewayPort) == 0 || length(backendPort) == 0) {
    if (isStarting) {
      stop(
        "Sparklyr gateway did not respond while retrieving ports information after ",
        waitSeconds,
        " seconds.",
        sparklyr_gateway_trouble_shooting_msg()
      )
    } else {
      return(NULL)
    }
  }

  list(
    gateway = gateway,
    backendPort = backendPort,
    redirectGatewayPort = redirectGatewayPort
  )
}

spark_connect_gateway <- function(
                                  gatewayAddress,
                                  gatewayPort,
                                  sessionId,
                                  config,
                                  isStarting = FALSE) {

  # try connecting to existing gateway
  gateway <- wait_connect_gateway(gatewayAddress, gatewayPort, config, isStarting)

  if (is.null(gateway)) {
    if (isStarting) {
      stop(
        "Gateway in ", gatewayAddress, ":", gatewayPort, " did not respond.",
        sparklyr_gateway_trouble_shooting_msg()
      )
    }

    NULL
  }
  else {
    worker_log("is querying ports from backend using port ", gatewayPort)

    gateway_ports_query_attempts <- as.integer(
      spark_config_value(config, "sparklyr.gateway.port.query.attempts", 3L)
    )
    gateway_ports_query_retry_interval_s <- as.integer(
      spark_config_value(config, "sparklyr.gateway.port.query.retry.interval.seconds", 4L)
    )
    while (gateway_ports_query_attempts > 0) {
      gateway_ports_query_attempts <- gateway_ports_query_attempts - 1
      withCallingHandlers(
        {
          gatewayPortsQuery <- query_gateway_for_port(
            gateway,
            sessionId,
            config,
            isStarting
          )
          break
        },
        error = function(e) {
          isStarting <- FALSE
          if (gateway_ports_query_attempts > 0) {
            Sys.sleep(gateway_ports_query_retry_interval_s)
          }
          NULL
        }
      )
    }
    if (is.null(gatewayPortsQuery) && !isStarting) {
      close(gateway)
      return(NULL)
    }

    redirectGatewayPort <- gatewayPortsQuery$redirectGatewayPort
    backendPort <- gatewayPortsQuery$backendPort

    worker_log("found redirect gateway port ", redirectGatewayPort)

    if (redirectGatewayPort == 0) {
      close(gateway)

      if (isStarting) {
        stop("Gateway in ", gatewayAddress, ":", gatewayPort, " does not have the requested session registered")
      }

      NULL
    } else if (redirectGatewayPort != gatewayPort) {
      close(gateway)

      spark_connect_gateway(gatewayAddress, redirectGatewayPort, sessionId, config, isStarting)
    }
    else {
      list(
        gateway = gateway,
        backendPort = backendPort
      )
    }
  }
}
core_invoke_sync_socket <- function(sc) {
  flush <- c(1)
  while (length(flush) > 0) {
    flush <- readBin(sc$backend, raw(), 1000)

    # while flushing monitored connections we don't want to hang forever
    if (identical(sc$state$use_monitoring, TRUE)) break
  }
}

core_invoke_sync <- function(sc) {
  # sleep until connection clears is back on valid state
  while (!core_invoke_synced(sc)) {
    Sys.sleep(1)
    core_invoke_sync_socket(sc)
  }
}

core_invoke_cancel_running <- function(sc) {
  if (is.null(spark_context(sc))) {
    return()
  }

  # if something fails while using a monitored connection we don't cancel jobs
  if (identical(sc$state$use_monitoring, TRUE)) {
    return()
  }

  # if something fails while cancelling jobs we don't cancel jobs, this can
  # happen in OutOfMemory errors that shut down the spark context
  if (identical(sc$state$cancelling_all_jobs, TRUE)) {
    return()
  }

  connection_progress_context(sc, function() {
    sc$state$cancelling_all_jobs <- TRUE
    on.exit(sc$state$cancelling_all_jobs <- FALSE)
    invoke(spark_context(sc), "cancelAllJobs")
  })

  if (exists("connection_progress_terminated")) connection_progress_terminated(sc)
}

write_bin_args <- function(backend, object, static, method, args, return_jobj_ref = FALSE) {
  rc <- rawConnection(raw(), "r+")
  writeString(rc, object)
  writeBoolean(rc, static)
  writeBoolean(rc, return_jobj_ref)
  writeString(rc, method)

  writeInt(rc, length(args))
  writeArgs(rc, args)
  bytes <- rawConnectionValue(rc)
  close(rc)

  rc <- rawConnection(raw(0), "r+")
  writeInt(rc, length(bytes))
  writeBin(bytes, rc)
  con <- rawConnectionValue(rc)
  close(rc)

  writeBin(con, backend)
}

core_invoke_synced <- function(sc) {
  if (is.null(sc)) {
    stop("The connection is no longer valid.")
  }

  backend <- core_invoke_socket(sc)
  echo_id <- "sparklyr"

  write_bin_args(backend, "Handler", TRUE, "echo", echo_id)

  returnStatus <- readInt(backend)

  if (length(returnStatus) == 0 || returnStatus != 0) {
    FALSE
  } else {
    object <- readObject(sc)
    identical(object, echo_id)
  }
}

core_invoke_socket <- function(sc) {
  if (identical(sc$state$use_monitoring, TRUE)) {
    sc$monitoring
  } else {
    sc$backend
  }
}

core_invoke_socket_name <- function(sc) {
  if (identical(sc$state$use_monitoring, TRUE)) {
    "monitoring"
  } else {
    "backend"
  }
}

core_remove_jobjs <- function(sc, ids) {
  core_invoke_method_impl(sc, static = TRUE, noreply = TRUE, "Handler", "rm", FALSE, as.list(ids))
}

core_invoke_method <- function(sc, static, object, method, return_jobj_ref, ...) {
  core_invoke_method_impl(sc, static, noreply = FALSE, object, method, return_jobj_ref, ...)
}

core_invoke_method_impl <- function(sc, static, noreply, object, method, return_jobj_ref, ...) {
  # N.B.: the reference to `object` must be retained until after a value or exception is returned to us
  # from the invoked method here (i.e., cannot have `object <- something_else` before that), because any
  # re-assignment could cause the last reference to `object` to be destroyed and the underlying JVM object
  # to be deleted from JVMObjectTracker before the actual invocation of the method could happen.
  lockBinding("object", environment())

  if (is.null(sc)) {
    stop("The connection is no longer valid.")
  }

  args <- list(...)

  # initialize status if needed
  if (is.null(sc$state$status)) {
    sc$state$status <- list()
  }

  # choose connection socket
  backend <- core_invoke_socket(sc)
  connection_name <- core_invoke_socket_name(sc)

  if (!identical(object, "Handler")) {
    toRemoveJobjs <- get_to_remove_jobjs(sc)
    objsToRemove <- ls(toRemoveJobjs)
    if (length(objsToRemove) > 0) {
      core_remove_jobjs(sc, objsToRemove)
      rm(list = objsToRemove, envir = toRemoveJobjs)
    }
  }

  if (!identical(object, "Handler") &&
    spark_config_value(sc$config, c("sparklyr.cancellable", "sparklyr.connection.cancellable"), TRUE)) {
    # if connection still running, sync to valid state
    if (identical(sc$state$status[[connection_name]], "running")) {
      core_invoke_sync(sc)
    }

    # while exiting this function, if interrupted (still running), cancel server job
    on.exit(core_invoke_cancel_running(sc))

    sc$state$status[[connection_name]] <- "running"
  }

  # if the object is a jobj then get it's id
  objId <- ifelse(inherits(object, "spark_jobj"), object$id, object)

  write_bin_args(backend, objId, static, method, args, return_jobj_ref)

  if (identical(object, "Handler") &&
    (identical(method, "terminateBackend") || identical(method, "stopBackend"))) {
    # by the time we read response, backend might be already down.
    return(NULL)
  }

  result_object <- NULL
  if (!noreply) {
    # wait for a return status & result
    returnStatus <- readInt(sc)

    if (length(returnStatus) == 0) {
      # read the spark log
      msg <- core_read_spark_log_error(sc)

      withr::with_options(list(
        warning.length = 8000
      ), {
        stop(
          "Unexpected state in sparklyr backend: ",
          msg,
          call. = FALSE
        )
      })
    }

    if (returnStatus != 0) {
      # get error message from backend and report to R
      msg <- readString(sc)
      withr::with_options(list(
        warning.length = 8000
      ), {
        if (nzchar(msg)) {
          core_handle_known_errors(sc, msg)
        } else {
          # read the spark log
          msg <- core_read_spark_log_error(sc)
        }
      })
      spark_error(msg)
    }

    result_object <- readObject(sc)
  }

  sc$state$status[[connection_name]] <- "ready"
  on.exit(NULL)

  attach_connection(result_object, sc)
}

#' @export
jobj_subclass.shell_backend <- function(con) {
  "shell_jobj"
}

jobj_subclass.spark_connection <- function(con) {
  "shell_jobj"
}

#' @export
jobj_subclass.spark_worker_connection <- function(con) {
  "shell_jobj"
}

core_handle_known_errors <- function(sc, msg) {
  # Some systems might have an invalid hostname that Spark <= 2.0.1 fails to handle
  # gracefully and triggers unexpected errors such as #532. Under these versions,
  # we proactevely test getLocalHost() to warn users of this problem.
  if (grepl("ServiceConfigurationError.*tachyon", msg, ignore.case = TRUE)) {
    warning(
      "Failed to retrieve localhost, please validate that the hostname is correctly mapped. ",
      "Consider running `hostname` and adding that entry to your `/etc/hosts` file."
    )
  } else if (grepl("check worker logs for details", msg, ignore.case = TRUE) &&
    spark_master_is_local(sc$master)) {
    abort_shell(
      "sparklyr worker rscript failure, check worker logs for details",
      NULL, NULL, sc$output_file, sc$error_file
    )
  }
}

core_read_spark_log_error <- function(sc) {
  # if there was no error message reported, then
  # return information from the Spark logs. return
  # all those with most recent timestamp
  msg <- "failed to invoke spark command (unknown reason)"
  try(silent = TRUE, {
    log <- readLines(sc$output_file)
    splat <- strsplit(log, "\\s+", perl = TRUE)
    n <- length(splat)
    timestamp <- splat[[n]][[2]]
    regex <- paste("\\b", timestamp, "\\b", sep = "")
    entries <- grep(regex, log, perl = TRUE, value = TRUE)
    pasted <- paste(entries, collapse = "\n")
    msg <- paste("failed to invoke spark command", pasted, sep = "\n")
  })
  msg
}


spark_error <- function(message) {
  option_name <- "sparklyr.simple.errors"
  simple_errors <- unlist(options(option_name))

  if (is.null(simple_errors)) {
    use_simple <- FALSE
  } else {
    use_simple <- simple_errors
  }

  if (use_simple) {
    stop(message, call. = FALSE)
  }

  split_message <- unlist(strsplit(message, "\n\t"))

  msg_l <- "\u001B]8;;"
  msg_r <- "\u001B\\"
  msg_fn <- "sparklyr::spark_last_error()"

  term <- Sys.getenv("TERM")
  color_terms <- c("xterm-color", "xterm-256color", "screen", "screen-256color")

  check_rstudio <- try(RStudio.Version(), silent = TRUE)

  in_rstudio <- TRUE
  if (inherits(check_rstudio, "try-error")) {
    in_rstudio <- FALSE
  }
  if (term %in% color_terms) {
    if (in_rstudio) {
      scheme <- "ide:run"
    } else {
      scheme <- "x-r-run"
    }

    msg_fun <- paste0(
      msg_l, scheme, ":", msg_fn, msg_r, "`", msg_fn, "`", msg_l, msg_r
    )
  } else {
    msg_fun <- paste0("`", msg_fn, "`")
  }

  last_err <- paste0(
    "Run ", msg_fun, " to see the full Spark error (multiple lines)"
  )

  option_msg <- paste(
    "To use the previous style of error message",
    "set `options(\"sparklyr.simple.errors\" = TRUE)`"
  )

  msg <- c(split_message[[1]], "", last_err, option_msg)

  #genv_set_last_error(message)

  rlang::abort(
    message = msg,
    use_cli_format = TRUE,
    call = NULL
  )
}

#' Surfaces the last error from Spark captured by internal `spark_error` function
#' @export
spark_last_error <- function() {
  last_error <- genv_get_last_error()
  if (!is.null(last_error)) {
    rlang::inform(last_error)
  } else {
    rlang::inform("No error found")
  }
}
#' Retrieve a Spark JVM Object Reference
#'
#' This S3 generic is used for accessing the underlying Java Virtual Machine
#' (JVM) Spark objects associated with \R objects. These objects act as
#' references to Spark objects living in the JVM. Methods on these objects
#' can be called with the \code{\link{invoke}} family of functions.
#'
#' @param x An \R object containing, or wrapping, a \code{spark_jobj}.
#' @param ... Optional arguments; currently unused.
#'
#' @seealso \code{\link{invoke}}, for calling methods on Java object references.
#'
#' @exportClass spark_jobj
#' @export
spark_jobj <- function(x, ...) {
  UseMethod("spark_jobj")
}

spark_jobj_id <- function(x) {
  x$id
}

#' @export
spark_jobj.default <- function(x, ...) {
  stop("Unable to retrieve a spark_jobj from object of class ",
    paste(class(x), collapse = " "),
    call. = FALSE
  )
}

#' @export
spark_jobj.spark_jobj <- function(x, ...) {
  x
}

#' @export
print.spark_jobj <- function(x, ...) {
  print_jobj(spark_connection(x), x, ...)
}

#' Generic method for print jobj for a connection type
#'
#' @param sc \code{spark_connection} (used for type dispatch)
#' @param jobj Object to print
#'
#' @keywords internal
#'
#' @export
print_jobj <- function(sc, jobj, ...) {
  UseMethod("print_jobj")
}

get_valid_jobjs <- function(con) {
  if (is.null(con$state$validJobjs)) {
    con$state$validJobjs <- new.env(parent = emptyenv())
  }
  con$state$validJobjs
}

get_to_remove_jobjs <- function(con) {
  if (is.null(con$state$toRemoveJobjs)) {
    con$state$toRemoveJobjs <- new.env(parent = emptyenv())
  }
  con$state$toRemoveJobjs
}

# Check if jobj points to a valid external JVM object
isValidJobj <- function(jobj) {
  exists("connection", jobj) && exists(jobj$id, get_valid_jobjs(jobj$connection))
}

getJobj <- function(con, objId) {
  newObj <- jobj_create(con, objId)
  validJobjs <- get_valid_jobjs(con)
  validJobjs[[objId]] <- get0(objId, validJobjs, ifnotfound = 0) + 1

  newObj
}

jobj_subclass <- function(con) {
  UseMethod("jobj_subclass")
}

# Handler for a java object that exists on the backend.
jobj_create <- function(con, objId) {
  if (!is.character(objId)) {
    stop("object id must be a character")
  }
  # NOTE: We need a new env for a jobj as we can only register
  # finalizers for environments or external references pointers.
  obj <- structure(new.env(parent = emptyenv()), class = c("spark_jobj", jobj_subclass(con)))
  obj$id <- objId

  # Register a finalizer to remove the Java object when this reference
  # is garbage collected in R
  reg.finalizer(obj, cleanup.jobj)
  obj
}

jobj_info <- function(jobj) {
  if (!inherits(jobj, "spark_jobj")) {
    stop("'jobj_info' called on non-jobj")
  }

  class <- NULL
  repr <- NULL

  tryCatch(
    {
      class <- invoke(jobj, "getClass")
      if (inherits(class, "spark_jobj")) {
        class <- invoke(class, "getName")
      }
    },
    error = function(e) {
    }
  )
  tryCatch(
    {
      repr <- invoke(jobj, "toString")
    },
    error = function(e) {
    }
  )
  list(
    class = class,
    repr = repr
  )
}

jobj_inspect <- function(jobj) {
  print(jobj)
  if (!connection_is_open(spark_connection(jobj))) {
    return(jobj)
  }

  class <- invoke(jobj, "getClass")

  cat("Fields:\n")
  fields <- invoke(class, "getDeclaredFields")
  lapply(fields, function(field) {
    print(field)
  })

  cat("Methods:\n")
  methods <- invoke(class, "getDeclaredMethods")
  lapply(methods, function(method) {
    print(method)
  })

  jobj
}

cleanup.jobj <- function(jobj) {
  if (isValidJobj(jobj)) {
    objId <- jobj$id
    validJobjs <- get_valid_jobjs(jobj$connection)
    validJobjs[[objId]] <- validJobjs[[objId]] - 1

    if (validJobjs[[objId]] == 0) {
      rm(list = objId, envir = validJobjs)
      # NOTE: We cannot call removeJObject here as the finalizer may be run
      # in the middle of another RPC. Thus we queue up this object Id to be removed
      # and then run all the removeJObject when the next RPC is called.
      toRemoveJobjs <- get_to_remove_jobjs(jobj$connection)
      toRemoveJobjs[[objId]] <- 1
    }
  }
}

clear_jobjs <- function() {
  scons <- spark_connection_find()
  for (scon in scons) {
    validJobjs <- get_valid_jobjs(scons)
    valid <- ls(validJobjs)
    rm(list = valid, envir = validJobjs)

    toRemoveJobjs <- get_to_remove_jobjs(scons)
    removeList <- ls(toRemoveJobjs)
    rm(list = removeList, envir = toRemoveJobjs)
  }
}

attach_connection <- function(jobj, connection) {
  if (inherits(jobj, "spark_jobj")) {
    jobj$connection <- connection
  }
  else if (is.list(jobj) || inherits(jobj, "struct")) {
    jobj <- lapply(jobj, function(e) {
      attach_connection(e, connection)
    })
  }
  else if (is.environment(jobj)) {
    jobj <- eapply(jobj, function(e) {
      attach_connection(e, connection)
    })
  }

  jobj
}
# Utility functions to serialize R objects so they can be read in Java.

# nolint start
# Type mapping from R to Java
#
# NULL -> Void
# integer -> Int
# character -> String
# logical -> Boolean
# double, numeric -> Double
# raw -> Array[Byte]
# Date -> Date
# POSIXct,POSIXlt -> Timestamp
#
# list[T] -> Array[T], where T is one of above mentioned types
# environment -> Map[String, T], where T is a native type
# jobj -> Object, where jobj is an object created in the backend
# nolint end

getSerdeType <- function(object) {
  type <- class(object)[[1]]

  if (type != "list") {
    type
  } else {
    # Check if all elements are of same type
    elemType <- unique(sapply(object, function(elem) {
      getSerdeType(elem)
    }))
    if (length(elemType) <= 1) {

      # Check that there are no NAs in arrays since they are unsupported in scala
      hasNAs <- any(is.na(object))

      if (hasNAs) {
        "list"
      } else {
        "array"
      }
    } else {
      "list"
    }
  }
}

writeObject <- function(con, object, writeType = TRUE) {
  type <- class(object)[[1]]

  if (type %in% c("integer", "character", "logical", "double", "numeric", "factor", "Date", "POSIXct")) {
    if (is.na(object) && !is.nan(object)) {
      object <- NULL
      type <- "NULL"
    }
  }

  serdeType <- getSerdeType(object)
  if (writeType) {
    writeType(con, serdeType)
  }
  switch(serdeType,
    NULL = writeVoid(con),
    integer = writeInt(con, object),
    character = writeString(con, object),
    logical = writeBoolean(con, object),
    double = writeDouble(con, object),
    numeric = writeDouble(con, object),
    raw = writeRaw(con, object),
    array = writeArray(con, object),
    list = writeList(con, object),
    struct = writeList(con, object),
    spark_jobj = writeJobj(con, object),
    environment = writeEnv(con, object),
    Date = writeDate(con, object),
    POSIXlt = writeTime(con, object),
    POSIXct = writeTime(con, object),
    factor = writeFactor(con, object),
    `data.frame` = writeList(con, object),
    spark_apply_binary_result = writeList(con, object),
    stop("Unsupported type '", serdeType, "' for serialization")
  )
}

writeVoid <- function(con) {
  # no value for NULL
}

writeJobj <- function(con, value) {
  if (!isValidJobj(value)) {
    stop("invalid jobj ", value$id)
  }
  writeString(con, value$id)
}

writeString <- function(con, value) {
  utfVal <- enc2utf8(as.character(value))
  writeInt(con, as.integer(nchar(utfVal, type = "bytes") + 1))
  writeBin(utfVal, con, endian = "big", useBytes = TRUE)
}

writeInt <- function(con, value) {
  writeBin(as.integer(value), con, endian = "big")
}

writeDouble <- function(con, value) {
  writeBin(value, con, endian = "big")
}

writeBoolean <- function(con, value) {
  # TRUE becomes 1, FALSE becomes 0
  writeInt(con, as.integer(value))
}

writeRaw <- function(con, batch) {
  writeInt(con, length(batch))
  writeBin(batch, con, endian = "big")
}

writeType <- function(con, class) {
  type <- switch(class,
    NULL = "n",
    integer = "i",
    character = "c",
    logical = "b",
    double = "d",
    numeric = "d",
    raw = "r",
    array = "a",
    list = "l",
    struct = "s",
    spark_jobj = "j",
    environment = "e",
    Date = "D",
    POSIXlt = "t",
    POSIXct = "t",
    factor = "c",
    `data.frame` = "l",
    spark_apply_binary_result = "l",
    stop("Unsupported type '", class, "' for serialization")
  )
  writeBin(charToRaw(type), con)
}

# Used to pass arrays where all the elements are of the same type
writeArray <- function(con, arr) {
  # TODO: Empty lists are given type "character" right now.
  # This may not work if the Java side expects array of any other type.
  if (length(arr) == 0) {
    elemType <- class("somestring")
  } else {
    elemType <- getSerdeType(arr[[1]])
  }

  writeType(con, elemType)
  writeInt(con, length(arr))

  if (length(arr) > 0) {
    for (a in arr) {
      writeObject(con, a, FALSE)
    }
  }
}

# Used to pass arrays where the elements can be of different types
writeList <- function(con, list) {
  writeInt(con, length(list))
  for (elem in list) {
    writeObject(con, elem)
  }
}

# Used to pass in hash maps required on Java side.
writeEnv <- function(con, env) {
  len <- length(env)

  writeInt(con, len)
  if (len > 0) {
    writeArray(con, as.list(ls(env)))
    vals <- lapply(ls(env), function(x) {
      env[[x]]
    })
    writeList(con, as.list(vals))
  }
}

writeDate <- function(con, date) {
  writeInt(con, as.integer(date))
}

writeTime <- function(con, time) {
  writeDouble(con, as.double(time))
}

writeFactor <- function(con, factor) {
  writeString(con, as.character(factor))
}

# Used to serialize in a list of objects where each
# object can be of a different type. Serialization format is
# <object type> <object> for each object
writeArgs <- function(con, args) {
  if (length(args) > 0) {
    for (a in args) {
      writeObject(con, a)
    }
  }
}
core_get_package_function <- function(packageName, functionName) {
  if (packageName %in% rownames(installed.packages()) &&
    exists(functionName, envir = asNamespace(packageName))) {
    get(functionName, envir = asNamespace(packageName))
  } else {
    NULL
  }
}
worker_config_serialize <- function(config) {
  paste(
    if (isTRUE(config$debug)) "TRUE" else "FALSE",
    spark_config_value(config, "sparklyr.worker.gateway.port", "8880"),
    spark_config_value(config, "sparklyr.worker.gateway.address", "localhost"),
    if (isTRUE(config$profile)) "TRUE" else "FALSE",
    if (isTRUE(config$schema)) "TRUE" else "FALSE",
    if (isTRUE(config$arrow)) "TRUE" else "FALSE",
    if (isTRUE(config$fetch_result_as_sdf)) "TRUE" else "FALSE",
    if (isTRUE(config$single_binary_column)) "TRUE" else "FALSE",
    if (isTRUE(config$spark_read)) "TRUE" else "FALSE",
    config$spark_version,
    sep = ";"
  )
}

worker_config_deserialize <- function(raw) {
  parts <- strsplit(raw, ";")[[1]]
  list(
    debug = as.logical(parts[[1]]),
    sparklyr.gateway.port = as.integer(parts[[2]]),
    sparklyr.gateway.address = parts[[3]],
    profile = as.logical(parts[[4]]),
    schema = as.logical(parts[[5]]),
    arrow = as.logical(parts[[6]]),
    fetch_result_as_sdf = as.logical(parts[[7]]),
    single_binary_column = as.logical(parts[[8]]),
    spark_read = as.logical(parts[[9]]),
    spark_version = parts[[10]]
  )
}
# nocov start

spark_worker_context <- function(sc) {
  hostContextId <- worker_invoke_method(sc, FALSE, "Handler", "getHostContext")
  worker_log("retrieved worker context id ", hostContextId)

  context <- structure(
    class = c("spark_jobj", "shell_jobj"),
    list(
      id = hostContextId,
      connection = sc
    )
  )
  worker_log("retrieved worker context")
  context
}

spark_worker_init_packages <- function(sc, context) {
  bundlePath <- worker_invoke(context, "getBundlePath")

  if (nchar(bundlePath) > 0) {
    bundleName <- basename(bundlePath)
    worker_log("using bundle name ", bundleName)

    workerRootDir <- worker_invoke_static(sc, "org.apache.spark.SparkFiles", "getRootDirectory")
    sparkBundlePath <- file.path(workerRootDir, bundleName)

    worker_log("using bundle path ", normalizePath(sparkBundlePath))

    if (!file.exists(sparkBundlePath)) {
      stop("failed to find bundle under SparkFiles root directory")
    }

    unbundlePath <- worker_spark_apply_unbundle(
      sparkBundlePath,
      workerRootDir,
      tools::file_path_sans_ext(bundleName)
    )

    .libPaths(unbundlePath)
    worker_log("updated .libPaths with bundle packages")
  }
  else {
    spark_env <- worker_invoke_static(sc, "org.apache.spark.SparkEnv", "get")
    spark_libpaths <- worker_invoke(worker_invoke(spark_env, "conf"), "get", "spark.r.libpaths", NULL)
    if (!is.null(spark_libpaths)) {
      spark_libpaths <- unlist(strsplit(spark_libpaths, split = ","))
      .libPaths(spark_libpaths)
    }
  }
}

spark_worker_execute_closure <- function(
                                         sc,
                                         closure,
                                         df,
                                         funcContext,
                                         grouped_by,
                                         barrier_map,
                                         fetch_result_as_sdf,
                                         partition_index) {
  if (nrow(df) == 0) {
    worker_log("found that source has no rows to be proceesed")
    return(NULL)
  }

  barrier_arg <- NULL
  worker_log("barrier is ", as.character(barrier_map))
  if (length(barrier_map) > 0) {
    worker_log("found barrier execution context")
    barrier_arg <- list(barrier = barrier_map)
  }
  closure_params <- length(formals(closure))
  has_partition_index_param <- (
    !is.null(funcContext$partition_index_param) &&
      nchar(funcContext$partition_index_param) > 0
  )
  if (has_partition_index_param) closure_params <- closure_params - 1
  closure_args <- c(
    list(df),
    if (!is.null(funcContext$user_context)) list(funcContext$user_context) else NULL,
    lapply(grouped_by, function(group_by_name) df[[group_by_name]][[1]]),
    barrier_arg
  )[0:closure_params]
  if (has_partition_index_param) {
    closure_args[[funcContext$partition_index_param]] <- partition_index
  }

  worker_log("computing closure")
  result <- do.call(closure, closure_args)
  worker_log("computed closure")

  if (identical(fetch_result_as_sdf, FALSE)) {
    serialize_impl <- spark_worker_get_serializer(sc)
    result <- lapply(result, function(x) serialize_impl(x, NULL))
    class(result) <- c("spark_apply_binary_result", class(result))
    result <- dplyr::tibble(spark_apply_binary_result = result)
  }

  if (!"data.frame" %in% class(result)) {
    worker_log("data.frame expected but ", class(result), " found")

    result <- as.data.frame(result)
  }

  result
}

spark_worker_clean_factors <- function(result) {
  if (any(sapply(result, is.factor))) {
    result <- as.data.frame(lapply(result, function(x) if (is.factor(x)) as.character(x) else x), stringsAsFactors = FALSE)
  }

  result
}

spark_worker_maybe_serialize_list_cols_as_json <- function(config, result) {
  if (identical(config$fetch_result_as_sdf, TRUE) &&
    config$spark_version >= "2.4.0" &&
    any(sapply(result, is.list))) {
    result <- do.call(
      dplyr::tibble,
      lapply(
        result,
        function(x) {
          if (is.list(x)) {
            x <- sapply(
              x,
              function(e) jsonlite::toJSON(e, auto_unbox = TRUE, digits = NA)
            )
            class(x) <- c(class(x), "list_col_as_json")
          }
          x
        }
      )
    )
  }

  result
}

spark_worker_apply_maybe_schema <- function(config, result) {
  if (identical(config$schema, TRUE)) {
    worker_log("updating schema")

    col_names <- colnames(result)
    types <- list()
    json_cols <- list()

    for (i in seq_along(result)) {
      if ("list_col_as_json" %in% class(result[[i]])) {
        json_cols <- append(json_cols, col_names[[i]])
        types <- append(types, "character")
      } else {
        types <- append(types, class(result[[i]])[[1]])
      }
    }

    result <- data.frame(
      names = paste(col_names, collapse = "|"),
      types = paste(types, collapse = "|"),
      json_cols = paste(json_cols, collapse = "|"),
      stringsAsFactors = FALSE
    )
  }

  result
}

spark_worker_build_types <- function(context, columns) {
  names <- names(columns)
  sqlutils <- worker_invoke(context, "getSqlUtils")
  fields <- worker_invoke(
    sqlutils,
    "createStructFields",
    lapply(
      names,
      function(name) {
        list(name, columns[[name]][[1]], TRUE)
      }
    )
  )

  worker_invoke(sqlutils, "createStructType", fields)
}

spark_worker_get_group_batch <- function(batch) {
  worker_invoke(
    batch, "get", 0L
  )
}

spark_worker_add_group_by_column <- function(df, result, grouped, grouped_by) {
  if (grouped) {
    if (nrow(result) > 0) {
      new_column_values <- lapply(grouped_by, function(grouped_by_name) df[[grouped_by_name]][[1]])
      names(new_column_values) <- grouped_by

      if ("AsIs" %in% class(result)) class(result) <- class(result)[-match("AsIs", class(result))]
      result <- do.call("cbind", list(new_column_values, result))

      names(result) <- gsub("\\.", "_", make.unique(names(result)))
    }
    else {
      result <- NULL
    }
  }

  result
}

get_arrow_converters <- function(context, config) {
  if (config$spark_version < "2.3.0") {
    stop("ArrowConverters is only supported for Spark 2.3 or above.")
  }

  worker_invoke(context, "getArrowConverters")
}

get_arrow_converters_impl <- function(context, config) {
  if (config$spark_version < "2.3.0") {
    stop("ArrowConverters is only supported for Spark 2.3 or above.")
  }

  worker_invoke(context, "getArrowConvertersImpl")
}

spark_worker_apply_arrow <- function(sc, config) {
  worker_log("using arrow serializer")

  context <- spark_worker_context(sc)
  spark_worker_init_packages(sc, context)

  deserialize_impl <- spark_worker_get_deserializer(sc)
  closure <- deserialize_impl(worker_invoke(context, "getClosure"))
  funcContext <- deserialize_impl(worker_invoke(context, "getContext"))
  grouped_by <- worker_invoke(context, "getGroupBy")
  grouped <- !is.null(grouped_by) && length(grouped_by) > 0
  columnNames <- worker_invoke(context, "getColumns")
  schema_input <- worker_invoke(context, "getSchema")
  time_zone <- worker_invoke(context, "getTimeZoneId")
  options_map <- worker_invoke(context, "getOptions")
  barrier_map <- as.list(worker_invoke(context, "getBarrier"))
  partition_index <- worker_invoke(context, "getPartitionIndex")

  if (grouped) {
    record_batch_raw_groups <- worker_invoke(context, "getSourceArray")
    record_batch_raw_groups_idx <- 1
    record_batch_raw <- spark_worker_get_group_batch(record_batch_raw_groups[[record_batch_raw_groups_idx]])
  } else {
    row_iterator <- worker_invoke(context, "getIterator")
    arrow_converters_impl <- get_arrow_converters_impl(context, config)
    record_batch_raw <- worker_invoke(
      arrow_converters_impl,
      "toBatchArray",
      row_iterator,
      schema_input,
      time_zone,
      as.integer(options_map[["maxRecordsPerBatch"]])
    )
  }

  reader <- arrow_record_stream_reader(record_batch_raw)
  record_entry <- arrow_read_record_batch(reader)

  all_batches <- list()
  total_rows <- 0

  schema_output <- NULL

  batch_idx <- 0
  while (!is.null(record_entry)) {
    batch_idx <- batch_idx + 1
    worker_log("is processing batch ", batch_idx)

    df <- arrow_as_tibble(record_entry)
    result <- NULL

    if (!is.null(df)) {
      colnames(df) <- columnNames[seq_along(colnames(df))]

      result <- spark_worker_execute_closure(
        sc,
        closure,
        df,
        funcContext,
        grouped_by,
        barrier_map,
        config$fetch_result_as_sdf,
        partition_index
      )

      result <- spark_worker_add_group_by_column(df, result, grouped, grouped_by)

      result <- spark_worker_clean_factors(result)

      result <- spark_worker_maybe_serialize_list_cols_as_json(config, result)

      result <- spark_worker_apply_maybe_schema(config, result)
    }

    if (!is.null(result)) {
      if (is.null(schema_output)) {
        schema_output <- spark_worker_build_types(context, lapply(result, class))
      }
      raw_batch <- arrow_write_record_batch(result, config$spark_version)

      all_batches[[length(all_batches) + 1]] <- raw_batch
      total_rows <- total_rows + nrow(result)
    }

    record_entry <- arrow_read_record_batch(reader)

    if (grouped && is.null(record_entry) && record_batch_raw_groups_idx < length(record_batch_raw_groups)) {
      record_batch_raw_groups_idx <- record_batch_raw_groups_idx + 1
      record_batch_raw <- spark_worker_get_group_batch(record_batch_raw_groups[[record_batch_raw_groups_idx]])

      reader <- arrow_record_stream_reader(record_batch_raw)
      record_entry <- arrow_read_record_batch(reader)
    }
  }

  if (length(all_batches) > 0) {
    worker_log("updating ", total_rows, " rows using ", length(all_batches), " row batches")

    arrow_converters <- get_arrow_converters(context, config)
    row_iter <- worker_invoke(arrow_converters, "fromPayloadArray", all_batches, schema_output)

    worker_invoke(context, "setResultIter", row_iter)
    worker_log("updated ", total_rows, " rows using ", length(all_batches), " row batches")
  } else {
    worker_log("found no rows in closure result")
  }

  worker_log("finished apply")
}

spark_worker_get_serializer <- function(sc) {
  serializer <- unserialize(worker_invoke(spark_worker_context(sc), "getSerializer"))
  if (is.list(serializer)) {
    function(x, ...) serializer$serializer(x)
  } else {
    serializer
  }
}

spark_worker_get_deserializer <- function(sc) {
  unserialize(worker_invoke(spark_worker_context(sc), "getDeserializer"))
}

spark_worker_apply <- function(sc, config) {
  context <- spark_worker_context(sc)
  spark_worker_init_packages(sc, context)

  grouped_by <- worker_invoke(context, "getGroupBy")
  grouped <- !is.null(grouped_by) && length(grouped_by) > 0
  if (grouped) worker_log("working over grouped data")

  length <- worker_invoke(context, "getSourceArrayLength")
  worker_log("found ", length, " rows")

  is_read <- ifelse(!is.null(config$spark_read), config$spark_read, FALSE)

  groups_func <- if(grouped) {
    "getSourceArrayGroupedSeq"
  } else if (is_read && config$spark_version >= "4") {
    "getSourceArraySeq2"
  } else {
    "getSourceArraySeq"
  }
  groups <- worker_invoke(context, groups_func)

  worker_log("retrieved ", length(groups), " rows")
  deserialize_impl <- spark_worker_get_deserializer(sc)

  closureRaw <- worker_invoke(context, "getClosure")
  closure <- deserialize_impl(closureRaw)

  funcContextRaw <- worker_invoke(context, "getContext")
  funcContext <- deserialize_impl(funcContextRaw)

  closureRLangRaw <- worker_invoke(context, "getClosureRLang")
  if (length(closureRLangRaw) > 0) {
    worker_log("found rlang closure")
    closureRLang <- spark_worker_rlang_unserialize()
    if (!is.null(closureRLang)) {
      closure <- closureRLang(closureRLangRaw)
      worker_log("created rlang closure")
    }
  }

  if (identical(config$schema, TRUE)) {
    worker_log("is running to compute schema")
  }

  columnNames <- worker_invoke(context, "getColumns")
  barrier_map <- as.list(worker_invoke(context, "getBarrier"))
  partition_index <- worker_invoke(context, "getPartitionIndex")

  if (!grouped) groups <- list(list(groups))

  all_results <- NULL

  for (group_entry in groups) {
    # serialized groups are wrapped over single lists
    data <- group_entry[[1]]
    if(config$spark_version >= "4" && !grouped && !is_read) {
      data <- lapply(data, unlist, recursive = FALSE)
    }
    df <- (
      if (config$single_binary_column) {
        dplyr::tibble(encoded = lapply(data, function(x) x[[1]]))
      } else {
        bind_rows <- core_get_package_function("dplyr", "bind_rows")
        as_tibble <- core_get_package_function("tibble", "as_tibble")
        if (!is.null(bind_rows) && !is.null(as_tibble)) {
          do.call(
            bind_rows,
            lapply(
              data, function(x) { as_tibble(x, .name_repair = "universal") }
            )
          )
        } else {
          warning("dplyr::bind_rows or dplyr::as_tibble is unavailable, ",
                  "falling back to rbind implementation in base R. ",
                  "Inputs with list column(s) will not work.")

          do.call(rbind.data.frame, c(data, list(stringsAsFactors = FALSE)))
        }
      })

    if (!config$single_binary_column) {
      # rbind removes Date classes so we re-assign them here
      if (length(data) > 0 && ncol(df) > 0 && nrow(df) > 0) {
        if (any(sapply(data[[1]], function(e) class(e)[[1]]) %in% c("Date", "POSIXct"))) {
          first_row <- data[[1]]
          for (idx in seq_along(first_row)) {
            first_class <- class(first_row[[idx]])[[1]]
            if (identical(first_class, "Date")) {
              df[[idx]] <- as.Date(df[[idx]], origin = "1970-01-01")
            } else if (identical(first_class, "POSIXct")) {
              df[[idx]] <- as.POSIXct(df[[idx]], origin = "1970-01-01")
            }
          }
        }

        # cast column to correct type, for instance, when dealing with NAs.
        for (i in seq_along(df)) {
          target_type <- funcContext$column_types[[i]]
          if (!is.null(target_type) && class(df[[i]]) != target_type) {
            df[[i]] <- do.call(paste("as", target_type, sep = "."), args = list(df[[i]]))
          }
        }
      }
    }

    colnames(df) <- columnNames[seq_along(colnames(df))]

    result <- spark_worker_execute_closure(
      sc,
      closure,
      df,
      funcContext,
      grouped_by,
      barrier_map,
      config$fetch_result_as_sdf,
      partition_index
    )

    result <- spark_worker_add_group_by_column(df, result, grouped, grouped_by)

    result <- spark_worker_clean_factors(result)

    result <- spark_worker_maybe_serialize_list_cols_as_json(config, result)

    result <- spark_worker_apply_maybe_schema(config, result)

    all_results <- rbind(all_results, result)
  }

  if (!is.null(all_results) && nrow(all_results) > 0) {
    worker_log("updating ", nrow(all_results), " rows")

    all_data <- lapply(seq_len(nrow(all_results)), function(i) as.list(all_results[i, ]))

    worker_invoke(context, "setResultArraySeq", all_data)
    worker_log("updated ", nrow(all_results), " rows")
  } else {
    worker_log("found no rows in closure result")
  }

  worker_log("finished apply")
}

spark_worker_rlang_unserialize <- function() {
  rlang_unserialize <- core_get_package_function("rlang", "bytes_unserialise")
  if (is.null(rlang_unserialize)) {
    core_get_package_function("rlanglabs", "bytes_unserialise")
  } else {
    rlang_unserialize
  }
}

spark_worker_unbundle_path <- function() {
  file.path("sparklyr-bundle")
}

#' Extracts a bundle of dependencies required by \code{spark_apply()}
#'
#' @param bundle_path Path to the bundle created using \code{spark_apply_bundle()}
#' @param base_path Base path to use while extracting bundles
#'
#' @keywords internal
#' @export
worker_spark_apply_unbundle <- function(bundle_path, base_path, bundle_name) {
  extractPath <- file.path(base_path, spark_worker_unbundle_path(), bundle_name)
  lockFile <- file.path(extractPath, "sparklyr.lock")

  if (!dir.exists(extractPath)) dir.create(extractPath, recursive = TRUE)

  if (length(dir(extractPath)) == 0) {
    worker_log("found that the unbundle path is empty, extracting:", extractPath)

    writeLines("", lockFile)
    system2("tar", c("-xf", bundle_path, "-C", extractPath))
    unlink(lockFile)
  }

  if (file.exists(lockFile)) {
    worker_log("found that lock file exists, waiting")
    while (file.exists(lockFile)) {
      Sys.sleep(1)
    }
    worker_log("completed lock file wait")
  }

  extractPath
}

# nocov end
# nocov start

spark_worker_connect <- function(
                                 sessionId,
                                 backendPort = 8880,
                                 config = list()) {
  gatewayPort <- spark_config_value(config, "sparklyr.worker.gateway.port", backendPort)

  gatewayAddress <- spark_config_value(config, "sparklyr.worker.gateway.address", "localhost")
  config <- list()

  worker_log("is connecting to backend using port ", gatewayPort)

  gatewayInfo <- spark_connect_gateway(gatewayAddress,
    gatewayPort,
    sessionId,
    config = config,
    isStarting = TRUE
  )

  worker_log("is connected to backend")
  worker_log("is connecting to backend session")

  tryCatch(
    {
      interval <- spark_config_value(config, "sparklyr.backend.interval", 1)

      backend <- socketConnection(
        host = "localhost",
        port = gatewayInfo$backendPort,
        server = FALSE,
        blocking = interval > 0,
        open = "wb",
        timeout = interval
      )

      class(backend) <- c(class(backend), "shell_backend")
    },
    error = function(err) {
      close(gatewayInfo$gateway)

      stop(
        "Failed to open connection to backend:", err$message
      )
    }
  )

  worker_log("is connected to backend session")

  sc <- structure(class = c("spark_worker_connection"), list(
    # spark_connection
    master = "",
    method = "shell",
    app_name = NULL,
    config = NULL,
    state = new.env(),
    # spark_shell_connection
    spark_home = NULL,
    backend = backend,
    gateway = gatewayInfo$gateway,
    output_file = NULL
  ))

  worker_log("created connection")

  sc
}

# nocov end
# nocov start
#' @export
connection_is_open.spark_worker_connection <- function(sc) {
  bothOpen <- FALSE
  if (!identical(sc, NULL)) {
    tryCatch(
      {
        bothOpen <- isOpen(sc$backend) && isOpen(sc$gateway)
      },
      error = function(e) {
      }
    )
  }
  bothOpen
}

worker_connection <- function(x, ...) {
  UseMethod("worker_connection")
}

worker_connection.spark_jobj <- function(x, ...) {
  x$connection
}

# nocov end
# nocov start

worker_invoke_method <- function(sc, static, object, method, ...) {
  core_invoke_method(sc, static, object, method, FALSE, ...)
}

worker_invoke <- function(jobj, method, ...) {
  UseMethod("worker_invoke")
}

#' @export
worker_invoke.shell_jobj <- function(jobj, method, ...) {
  worker_invoke_method(worker_connection(jobj), FALSE, jobj, method, ...)
}

worker_invoke_static <- function(sc, class, method, ...) {
  worker_invoke_method(sc, TRUE, class, method, ...)
}

worker_invoke_new <- function(sc, class, ...) {
  worker_invoke_method(sc, TRUE, class, "<init>", ...)
}

# nocov end
# nocov start

worker_log_env <- new.env()

worker_log_session <- function(sessionId) {
  assign("sessionId", sessionId, envir = worker_log_env)
}

worker_log_format <- function(message, session, level = "INFO", component = "RScript") {
  paste(
    format(Sys.time(), "%y/%m/%d %H:%M:%S"),
    " ",
    level,
    " sparklyr: ",
    component,
    " (",
    session,
    ") ",
    message,
    sep = ""
  )
}

worker_log_level <- function(..., level, component = "RScript") {
  if (is.null(worker_log_env$sessionId)) {
    worker_log_env <- get0("worker_log_env", envir = .GlobalEnv)
    if (is.null(worker_log_env$sessionId)) {
      return()
    }
  }

  args <- list(...)
  message <- paste(args, sep = "", collapse = "")
  formatted <- worker_log_format(message, worker_log_env$sessionId,
    level = level, component = component
  )
  cat(formatted, "\n")
}

worker_log <- function(...) {
  worker_log_level(..., level = "INFO")
}

worker_log_warning <- function(...) {
  worker_log_level(..., level = "WARN")
}

worker_log_error <- function(...) {
  worker_log_level(..., level = "ERROR")
}

# nocov end
# nocov start

.worker_globals <- new.env(parent = emptyenv())

spark_worker_main <- function(
                              sessionId,
                              backendPort = 8880,
                              configRaw = NULL) {
  spark_worker_hooks()
  tryCatch(
    {
      worker_log_session(sessionId)

      if (is.null(configRaw)) configRaw <- worker_config_serialize(list())

      config <- worker_config_deserialize(configRaw)

      if (identical(config$profile, TRUE)) {
        profile_name <- paste("spark-apply-", as.numeric(Sys.time()), ".Rprof", sep = "")
        worker_log("starting new profile in ", file.path(getwd(), profile_name))
        utils::Rprof(profile_name)
      }

      if (config$debug) {
        worker_log("exiting to wait for debugging session to attach")

        # sleep for 1 day to allow long debugging sessions
        Sys.sleep(60 * 60 * 24)
        return()
      }

      worker_log("is starting")

      options(sparklyr.connection.cancellable = FALSE)

      sc <- spark_worker_connect(sessionId, backendPort, config)
      worker_log("is connected")

      if (config$arrow) {
        spark_worker_apply_arrow(sc, config)
      }
      else {
        spark_worker_apply(sc, config)
      }

      if (identical(config$profile, TRUE)) {
        # utils::Rprof(NULL)
        worker_log("closing profile")
      }
    },
    error = function(e) {
      worker_log_error("terminated unexpectedly: ", e$message)
      if (exists(".stopLastError", envir = .worker_globals)) {
        worker_log_error("collected callstack: \n", get(".stopLastError", envir = .worker_globals))
      }
      quit(status = -1)
    }
  )

  worker_log("finished")
}

spark_worker_hooks <- function() {
  unlock <- get("unlockBinding")
  lock <- get("lockBinding")

  originalStop <- stop
  unlock("stop", as.environment("package:base"))
  assign("stop", function(...) {
    frame_names <- list()
    frame_start <- max(1, sys.nframe() - 5)
    for (i in frame_start:sys.nframe()) {
      current_call <- sys.call(i)
      frame_names[[1 + i - frame_start]] <- paste(i, ": ", paste(head(deparse(current_call), 5), collapse = "\n"), sep = "")
    }

    assign(".stopLastError", paste(rev(frame_names), collapse = "\n"), envir = .worker_globals)
    originalStop(...)
  }, as.environment("package:base"))
  lock("stop", as.environment("package:base"))
}

# nocov end
do.call(spark_worker_main, as.list(commandArgs(trailingOnly = TRUE)))
